Kafka consumer原理剖析

Blockquote

REF

consumer的新特性

在 0.9.0.0 之后的 Kafka,出现了几个新变动,一个是在 Server 端增加了 GroupCoordinator 这个角色,另一个较大的变动是将 topic 的 offset 信息由之前存储在 zookeeper 上改为存储到一个特殊的 topic 中(__consumer_offsets)。

__consumer_offsets

consumer_offsets 是 Kafka 内部使用的一个 topic,专门用来存储 group 消费的情况,默认情况下有50个 partition,每个 partition 三副本,而具体 group 的消费情况要存储到哪一个 partition 上,是根据 abs(GroupId.hashCode()) % NumPartitions 来计算(其中,NumPartitions 是consumer_offsets 的 partition 数,默认是50个)的。

offset

  • Last Commiteed Offset:consumer上一次commit的位置;
  • Current Position:cosumer当前消费到的位置,last coomitted offset 到current position之间的就是当前正被consumer处理的消息。
  • High Watermark:被成功备份到所有replicas的最新位置,该位置之前的所消息都被认为是安全可消费的。
  • Log End Offset:Producer 写入到 Kafka 中的最新一条数据的 offset;

coordinator机制

kafka server将partiton分配的工作转移到了Client上(Producer中也可以看到),server保留的是group的分配工作,这样的设计是为了方便client使用灵活的partition分配方案。

coordinator in server

server上的Coordinator 负责reblance、Offset提交、心跳,实现主要代码在kafka.coordinator.GroupCoordinator.scala

一个consumer group对应一个coordinator

coordinator 状态机

共有5种状态:

  1. Dead:group中没有成员,并且metadata已被移除,这种状态响应各种请求都是一个response: UNKNOWN_MEMBER_ID
  2. Empty:Group 没有任何成员,如果所有的 offsets 都过期的话就会变成 Dead,一般当 Group 新创建时是这个状态,也有可能这个 Group 仅仅用于 offset commits 并没有任何成员,该状态至响应JoinGroupRequest
  3. Stable:这种状态下,coordinator已经获得了激活的generation,或者目前没有成员,等待第一个joinGroup。该状态还会接受成员的heartbeats。
  4. PreparingRebalance:准备重平衡状态,例如member发生变化
  5. AwaitingSync:所有的joinGroup请求都接受到后,会选举产生一个leader,这个状态就是在等待leader发送partition的分配结果(SyncGroupRequest)。

状态机如下:

coordinator in client

根据KafkaConsumer主要方法pollOnce来跟踪client上的coordinator工作过程

private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {

coordinator.ensureCoordinatorReady();

if (subscriptions.partitionsAutoAssigned())
coordinator.ensurePartitionAssignment();

if (!subscriptions.hasAllFetchPositions())
updateFetchPositions(this.subscriptions.missingFetchPositions());

long now = time.milliseconds();
client.executeDelayedTasks(now);

Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
if (!records.isEmpty())
return records;

fetcher.sendFetches();
client.poll(timeout, now);
return fetcher.fetchedRecords();
}

第一步、投石问路——确保server端有可用的coordinator

public void ensureCoordinatorReady() {
//通过与节点建立连接判断coordinator是否存活
while (coordinatorUnknown()) {
//发送GroupCoordinatorRequest请求
RequestFuture<Void> future = sendGroupCoordinatorRequest();
client.poll(future);

if (future.failed()) {
if (future.isRetriable())
client.awaitMetadataUpdate();
else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
coordinatorDead();
time.sleep(retryBackoffMs);
}

}
}

如果请求有broker响应了,那么将将该节点做为coordinator:

this.coordinator = new Node(Integer.MAX_VALUE - groupCoordinatorResponse.node().id(),
groupCoordinatorResponse.node().host(),
groupCoordinatorResponse.node().port());

第二步、确保group是可用的

首先,对group需要reJoin的情况进行梳理:

  • 有consumer离开当前group,client会发送一个LeaveGroupRequest如:
  • 不再订阅某个topic
  • ConsumerCoordinator执行关闭操作
  • 发送SyncGroupRequest后收到的response异常
  • 发送HeartbeatRequest后收到的response异常,包括:REBALANCE_IN_PROGRESS(正在重平衡),ILLEGAL_GENERATION(generation值不合法),UNKNOWN_MEMBER_ID(未知的成员)
    public void ensureActiveGroup() {
    if (!needRejoin()) return;

    //如果设置了auto commit,那么在rebalance之前先提交,再准备reJoin
    if (needsJoinPrepare) {
    onJoinPrepare(generation, memberId);
    needsJoinPrepare = false;
    }

    while (needRejoin()) {
    ensureCoordinatorReady();

    //在reblance执行之前,需要确保所有JoinGroup的请求都被处理掉了,避免频繁的reblance
    if (client.pendingRequestCount(this.coordinator) > 0) {
    client.awaitPendingRequests(this.coordinator);
    continue;
    }

    RequestFuture<ByteBuffer> future = sendJoinGroupRequest();
    future.addListener(new RequestFutureListener<ByteBuffer>() {
    @Override
    public void onSuccess(ByteBuffer value) {
    onJoinComplete(generation, memberId, protocol, value);
    needsJoinPrepare = true;
    heartbeatTask.reset();
    }
    });
    client.poll(future);
    }
    }

JoinGroupRequest中包含的信息有:

  • groupId
  • memberId
  • subscriptions && PartitionAssignor(默认:RangeAssignor)

第三步、处理JoinGroupResponse

这是通过回调函数实现的,具体的是JoinGroupResponseHandler的handle方法:

public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
Errors error = Errors.forCode(joinResponse.errorCode());
if (error == Errors.NONE) {
//记录新的generation
AbstractCoordinator.this.generation = joinResponse.generationId();
AbstractCoordinator.this.rejoinNeeded = false;
//leader与follower区别对待
if (joinResponse.isLeader()) {
onJoinLeader(joinResponse).chain(future);
} else {
onJoinFollower().chain(future);
}
} else if {
...
}
}

server的coordinator在收到joinGroupRequest后,会为每个group组选择一个member任命为leader。

leader在收到response后,会进行partition的分配,并且将分配结果发送给server的coordinator

private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.leaderId(), joinResponse.groupProtocol(),
joinResponse.members());
SyncGroupRequest request = new SyncGroupRequest(groupId, generation, memberId, groupAssignment);
return sendSyncGroupRequest(request);
}

分区分配的逻辑:

protected Map<String, ByteBuffer> performAssignment(String leaderId,
String assignmentStrategy,
Map<String, ByteBuffer> allSubscriptions) {
//获取分配规则(默认range)
PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
Set<String> allSubscribedTopics = new HashSet<>();
Map<String, Subscription> subscriptions = new HashMap<>();
//获取订阅的topic
for (Map.Entry<String, ByteBuffer> subscriptionEntry : allSubscriptions.entrySet()) {
Subscription subscription = ConsumerProtocol.deserializeSubscription(subscriptionEntry.getValue());
subscriptions.put(subscriptionEntry.getKey(), subscription);
allSubscribedTopics.addAll(subscription.topics());
}

this.subscriptions.groupSubscribe(allSubscribedTopics);
metadata.setTopics(this.subscriptions.groupSubscription());

//对每个订阅的topic进行分区分配
Map<String, Assignment> assignment = assignor.assign(metadata.fetch(), subscriptions);

Map<String, ByteBuffer> groupAssignment = new HashMap<>();
for (Map.Entry<String, Assignment> assignmentEntry : assignment.entrySet()) {
ByteBuffer buffer = ConsumerProtocol.serializeAssignment(assignmentEntry.getValue());
groupAssignment.put(assignmentEntry.getKey(), buffer);
}

return groupAssignment;
}

follower只需要发送一个不包含分区结果的SyncGroupRequest

private RequestFuture<ByteBuffer> onJoinFollower() {
SyncGroupRequest request = new SyncGroupRequest(groupId, generation,
memberId, Collections.<String, ByteBuffer>emptyMap());
return sendSyncGroupRequest(request);
}

关闭KafkaListnerContainer时发生了些什么

一共三个操作:

if(this.listenerInvokerFuture != null) {
this.stopInvokerAndCommitManualAcks();
}

try {
this.consumer.unsubscribe();
} catch (WakeupException var8) {
;
}

this.consumer.close();

主要看unsubscribe方法:

this.subscriptions.unsubscribe();
this.coordinator.maybeLeaveGroup();
this.metadata.needMetadataForAllTopics(false);

  • 首先取消了所有订阅
  • 然后发送一个LeaveGroupRequest,并且将memberId设为UNKNOWN,needRejoin设为true

当所有member离开时,server的coordinator进入Empty状态

小实验

创建两个ListenerContainer,订阅同一个topic(“test” with partitions=6),并且在同一个group内:

ConcurrentMessageListenerContainer testContainer = new ContainerBuilder()
// .setTopic("kafka.scanAllBackup")
.setTopic("test")
.setGroupId("g1")
.setListenerName("testListener")
.setBeanName("testContainer")
...
ConcurrentMessageListenerContainer secondContainer = new ContainerBuilder()
// .setTopic("kafka.scanAllBackup")
.setTopic("test")
.setGroupId("g1")
.setListenerName("testListener")
.setBeanName("secondContainer")

实验结果

开启第一个Container

此时的generation = 1,分配的分区数为6
开启第二个Container

因为有新的member加入,因此触发了Rebalance,根据Range分配规则,每个consumer获得3个分区

简述下Range Assignor规则:假如topic有N个分区(按number排序),group组内有M个consumer(按字典序排列)订阅,那么就现将分区分成M份,每份N/M个,如果不能整除,就将余数(N%M)分配给前N%M个consumer

关闭第一个Container

有member离开group,再次触发Reblance,第二个container独享6个分区

实验二

在使用kafka的时候有个现象:如果将正在消费的consumer关闭、重启,那么在短时间内他是无法接收到消息的,从日志上看得话就是server coordinator没有为这个consumer分配分区,为了详解这种机制,我将发送JoinGroup的debug信息输出。测试用例同上
开启第一个container

在6分09秒发送了一个joinGroup请求,但是并没有得到反馈。

g1这个group在server中的generation=1,因此新的joinGroup请求进来后,server进入PreparingRebalance状态。

开启第二个Container,关闭第一个container

发送了第二个joinGroup请求,并没有马上收到反馈,在6分29秒关闭了第一个container,经过了96s后,收到了反馈,并且指定leader为client2(也就是当前的consumer),client1是member,成功分配到了3个分区。

在10分06秒的时候,server再次执行了重平衡,client2再次发送了joinGroup请求,马上得到了反馈,并且这次独享6个分区。

产生这次重平衡的原因是:8分05秒server反馈了joinGroup请求,session_timeout设置的是两分钟,在10分05秒的时候,server依然未收到client1的heartbeat,因此触发了重平衡

server.log

[2017-03-02 19:06:03,197] INFO [GroupCoordinator 2]: Stabilized group g1 generation 1 (kafka.coordinator.GroupCoordinator)
[2017-03-02 19:06:03,201] INFO [GroupCoordinator 2]: Assignment received from leader for group g1 for generation 1 (kafka.coordinator.GroupCoordinator)
[2017-03-02 19:06:10,351] INFO [GroupCoordinator 2]: Preparing to restabilize group g1 with old generation 1 (kafka.coordinator.GroupCoordinator)
[2017-03-02 19:08:06,504] INFO [GroupCoordinator 2]: Stabilized group g1 generation 2 (kafka.coordinator.GroupCoordinator)
[2017-03-02 19:08:06,511] INFO [GroupCoordinator 2]: Assignment received from leader for group g1 for generation 2 (kafka.coordinator.GroupCoordinator)
[2017-03-02 19:10:06,506] INFO [GroupCoordinator 2]: Preparing to restabilize group g1 with old generation 2 (kafka.coordinator.GroupCoordinator)
[2017-03-02 19:10:06,966] INFO [GroupCoordinator 2]: Stabilized group g1 generation 3 (kafka.coordinator.GroupCoordinator)
[2017-03-02 19:10:06,970] INFO [GroupCoordinator 2]: Assignment received from leader for group g1 for generation 3 (kafka.coordinator.GroupCoordinator)
[2017-03-02 19:10:51,173] INFO [GroupCoordinator 2]: Preparing to restabilize group g1 with old generation 3 (kafka.coordinator.GroupCoordinator)
[2017-03-02 19:10:51,175] INFO [GroupCoordinator 2]: Group g1 generation 3 is dead and removed (kafka.coordinator.GroupCoordinator)
  • 6分03秒,server收到来自leader的syncGroup请求,coordinator进入Stable状态。
  • 随后,非正常关闭container,所有member离开了group,coordinator进入Empty状态 coordinator无法收到members的心跳
  • 6分10秒,client重启,并发送了joinGroup请求,memberId为UNKNOWN,server进入PreparingRebalance状态

疑点coordinator在收到client1的leaveGroup请求后为啥还会响应其joinGroup请求嘞? coordinator因为没有感知到client1的离开,所以才会长时间等待